查看原文
其他

高性能消息中间件 NSQ 解析-应用实践

aoho aoho求索 2022-09-08

Nsq 是用 Go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理,对于学习 Go channel的原理和用法,以及如何用 Go 语言来写分布式是一个很不错的入门项目。

我们在上一篇文章整体介绍了 nsq 的组成以及各个模块的功能,本文将会带领大家一起实践 nsq 的安装,并基于 nsq 提供的 API 进行实践。

安装使用

在官网(https://nsq.io/overview/quick_start.html) 下载对应的二进制可执行文件。

# 启动nsqlookupd
$ nsqlookupd
# 启动 nsqd
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
# 启动 nsqadmin
$ nsqadmin --lookupd-http-address=127.0.0.1:4161

# 创建topic,发送消息
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
# 启动nsq_to_file
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
# 发布消息到 nsqd
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'

在本地按照上述步骤就可以跑起来了。

创建生产者

安装好 nsq 的几个服务之后,我们来实现基于 nsq 的生产和消费示例。首先是创建生产者:

package main

import (
  "fmt"
  "log"
  "time"

  "github.com/nsqio/go-nsq"
)

func main() {
  config := nsq.NewConfig()
  p, err := nsq.NewProducer("127.0.0.1:4150", config)

  if err != nil {
    log.Panic(err)
  }

  for i := 0; i < 1000; i++ {
    msg := fmt.Sprintf("num-%d", i)
    log.Println("Pub:" + msg)
    err = p.Publish("testTopic", []byte(msg))
    if err != nil {
      log.Panic(err)
    }
    time.Sleep(time.Second * 1)
  }

  p.Stop()
}

生产者的逻辑比较简单,基于 nsq 官方提供的 github.com/nsqio/go-nsq包,通过调用,循环写 1000 个字符+数字,即 num-n 的形式,通过 p.Publish 发送到消息队列中,等待消费。

消费者

接着,我们创建消费者:consumer.go 来消费刚刚生产的消息。

package main

import (
  "log"
  "sync"

  "github.com/nsqio/go-nsq"
)

func main() {
  wg := &sync.WaitGroup{}
  wg.Add(1000)

  config := nsq.NewConfig()
  c, _ := nsq.NewConsumer("testTopic""ch", config)
  c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
    log.Printf("Got a message: %s", message.Body)
    wg.Done()
    return nil
  }))

  // 1.直连nsqd
  // err := c.ConnectToNSQD("127.0.0.1:4150")

  // 2.通过 nsqlookupd 服务发现
  err := c.ConnectToNSQLookupd("127.0.0.1:4161")
  if err != nil {
    log.Panic(err)
  }
  wg.Wait()
}

可通过两种方式与 nsqd 连接:

  • 直连 nsqd,适用于单机(standalone)版;
  • 通过 nsqlookupd 服务发现,适用于集群(cluster)版;

消费消息的动作,主要逻辑就是打印出来,实际业务中需要进行其他处理。

运行结果

依次启动生产者和消费者的服务,可以分别看到如下的输出结果:

$go run producer.go

2020/12/28 20:29:51 Pub:num-0
2020/12/28 20:29:51 INF    1 (127.0.0.1:4150) connecting to nsqd
2020/12/28 20:29:52 Pub:num-1
2020/12/28 20:29:53 Pub:num-2
2020/12/28 20:29:54 Pub:num-3
2020/12/28 20:29:55 Pub:num-4
2020/12/28 20:29:56 Pub:num-5
2020/12/28 20:29:57 Pub:num-6
2020/12/28 20:29:58 Pub:num-7
2020/12/28 20:29:59 Pub:num-8
2020/12/28 20:30:00 Pub:num-9
2020/12/28 20:30:01 Pub:num-10

$ go run consumer.go

2020/12/28 20:30:08 INF    1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
2020/12/28 20:30:08 INF    1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
2020/12/28 20:30:08 Got a message: num-0
2020/12/28 20:30:08 Got a message: num-1
2020/12/28 20:30:08 Got a message: num-2
2020/12/28 20:30:08 Got a message: num-3
2020/12/28 20:30:08 Got a message: num-4
2020/12/28 20:30:08 Got a message: num-5
2020/12/28 20:30:08 Got a message: num-6
2020/12/28 20:30:08 Got a message: num-7
2020/12/28 20:30:08 Got a message: num-8
2020/12/28 20:30:08 Got a message: num-9
2020/12/28 20:30:08 Got a message: num-10

通过如上的示例,我们已经成功地实现 NSQ 的应用。下面我们将解析 NSQ 的几个核心部分。

小结

本文主要介绍 nsq 的安装使用,下载好可执行文件之后,依次启动 nsqlookupd、nsqd、nsqadmin 几个服务。接着我们基于官方提供的客户端 API 包实现了生产消费模型的案例。通过简单的案例,我们能够对 nsq 的安装和基本使用有一个了解。

下一篇文章,将会具体分析 nsq 实现的细节。

推荐阅读

高性能消息中间件 nsq 解析-介绍

如何使用 Go 更好地开发并发程序,纯干货!

几款符合 OpenTracing 规范的分布式链路追踪组件介绍与选型

订阅最新文章,欢迎关注我的公众号



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存